-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner #21948
[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner #21948
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we can also fix the issue by involving a read lock to the addReplicationCluster()
method in PersistentTopic.java without changing the close topic process. The replicator.disconnect()
method will also call the managed cursor API to change the cursor state which doesn't make sense to close the managed ledger before closing the replicator
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9baafcb2e9..db6910eb51 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1918,11 +1918,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
return;
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
+ lock.readLock().lock();
try {
- return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
- remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+ if (!isClosingOrDeleting) {
+ return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
+ remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+ }
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+ } finally {
+ lock.readLock().unlock();
}
return null;
});
8d24801
to
0723666
Compare
0723666
to
c163106
Compare
c163106
to
1506b48
Compare
… an orphan replicator in the previous topic owner
1506b48
to
d3b00d2
Compare
Rebase master |
Good suggestion, already improved the code. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #21948 +/- ##
============================================
+ Coverage 73.57% 73.95% +0.38%
+ Complexity 32624 2744 -29880
============================================
Files 1877 1885 +8
Lines 139502 140512 +1010
Branches 15299 15450 +151
============================================
+ Hits 102638 103916 +1278
+ Misses 28908 28563 -345
- Partials 7956 8033 +77
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@poorbarcode I wonder if these changes somehow cause OOME problems with org.apache.pulsar.broker.service.ReplicatorSubscriptionTest#testWriteMarkerTaskOfReplicateSubscriptions, please see |
@poorbarcode I can confirm that this PR is causing problems which most likely lead to OOMEs. Reported as #22588.
then checkout d475655 which is the commit before b774666 (#21948). you can see that the behavior is very different:
|
} | ||
|
||
private void unfenceReplicatorsToResume() { | ||
checkReplication(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can lead to a loop
checkReplication -> fail to delete replicated topic -> unfenceTopicToResume ->checkReplication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2024-04-25T14:57:46,573 - INFO - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of global namespace repl list [r2, r3]
2024-04-25T14:57:46,573 - INFO - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - [persistent://pulsar/global/removeClusterTest/__change_events] Error deleting topic
org.apache.pulsar.broker.service.BrokerServiceException$PersistenceException: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
at org.apache.pulsar.broker.service.persistent.PersistentTopic$6.deleteLedgerFailed(PersistentTopic.java:1496) ~[classes/:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncDelete$33(ManagedLedgerImpl.java:2950) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2947) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
at java.base/java.lang.Thread.run(Thread.java:1570) [?:?]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
Caused by: java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2451) ~[?:?]
at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
... 19 more
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
2024-04-25T14:57:46,573 - INFO - [broker-topic-workers-OrderedExecutor-2-0:PersistentTopic] - Deleting topic [persistent://pulsar/global/removeClusterTest/__change_events] because local cluster is not part of global namespace repl list [r2, r3]
2024-04-25T14:57:46,573 - INFO - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r2] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO - [broker-topic-workers-OrderedExecutor-2-0:BrokerService] - Successfully delete authentication policies for topic persistent://pulsar/global/removeClusterTest/__change_events
2024-04-25T14:57:46,573 - INFO - [ForkJoinPool.commonPool-worker-4:AbstractReplicator] - [persistent://pulsar/global/removeClusterTest/__change_events | r1-->r3] Skip current termination since other thread is doing termination, state : Terminated
2024-04-25T14:57:46,573 - INFO - [broker-topic-workers-OrderedExecutor-2-0:ManagedLedgerImpl] - pulsar/global/removeClusterTest/persistent/__change_events Moving to FencedForDeletion state
2024-04-25T14:57:46,573 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:ManagedLedgerImpl] - [pulsar/global/removeClusterTest/persistent/__change_events] Error truncating ledger for deletion
java.util.concurrent.CompletionException: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.andTree(CompletableFuture.java:1527) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.allOf(CompletableFuture.java:2451) ~[?:?]
at org.apache.pulsar.common.util.FutureUtil.waitForAll(FutureUtil.java:56) ~[pulsar-common-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncTruncate(ManagedLedgerImpl.java:4341) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncDelete(ManagedLedgerImpl.java:2946) ~[managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$40(PersistentTopic.java:1468) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$41(PersistentTopic.java:1462) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$42(PersistentTopic.java:1453) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2179) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$34(PersistentTopic.java:1431) ~[classes/:?]
at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:787) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[bookkeeper-common-4.17.0.jar:4.17.0]
at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[bookkeeper-common-4.17.0.jar:4.17.0]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
at java.base/java.lang.Thread.run(Thread.java:1570) [?:?]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException: Cursor was already closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this also means we need to think about how to skip deleting ledgers when the cursor is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is caused by the method topic.close
is a reentrant method, the issue steps are like the following
reset namespace level policies -> checkReplication -> delete topic
unload namespace -> unload topic -> close topic
The method topic.close
can be run even if the deleting task is in-progress, it is not correct, the PR #17524 will fix it, please review it again, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define enum state for topic and reject/skip any invalid state transition (fail/skip fast)?
For example, delete should skip if closed, and vice-versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with you, we need more than one separate PRs to merge the states fenced, close, deleting
, just like we discussed here
… an orphan replicator in the previous topic owner (apache#21948) (cherry picked from commit b774666) (cherry picked from commit 6038bbf)
… an orphan replicator in the previous topic owner (apache#21948) (cherry picked from commit b774666) (cherry picked from commit 6038bbf)
Motivation
There is a race condition that makes an orphan replicator in the original owner of a topic, and causes the new owner of the topic can not start a replicator due to
org.apache.pulsar.broker.service.BrokerServiceException$NamingException Producer with name 'pulsar.repl.{local_cluster}-->{remote_cluster}' is already connected to topic
.Scenario 1
Scenario 2
replication_clusters
.After we solved the scenario 1 by #21946, the current PR is focusing on the scenario 2:
Current PR is focusing on Scenario 2.
Steps of Scenario 2
thread enable replication
unload bundle
closing
replicator.disconnect()
becausetopic.replicators
is emptypulsar.repl
replicator.stat --> Starting
replicator.stat --> Started
readMoreEntries
, since there is no entries to read, just pending this requestpulsar.repl
Producer with name 'pulsar.repl.{local_cluster}-->{remote_cluster}' is already connected to topic
Since the scenario is too complex, I can not add a test.
TODO: reproduce the Scenario 2 locally.
Modifications
replicators.disconnect
after the managed ledger is closed. It would prevent the new cursor(pulsar.dedup
) from being created.topic.close
will be done afterreplicators.disconnect
, it can avoid the new replicator on the next owner broker of the topic failing due to creating an internal producer failedorg.apache.pulsar.broker.service.BrokerServiceException$NamingException Producer with name 'pulsar.repl.{local_cluster}-->{remote_cluster}' is already connected to topic
.replicator.producer.close
will no longer fail.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x